/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.fluo.accumulo.iterators;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;

import com.google.common.annotations.VisibleForTesting;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.NotificationUtil;
import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
import org.apache.fluo.accumulo.values.DelReadLockValue;
import org.apache.fluo.accumulo.values.WriteValue;

/**
 * This iterator cleans up old versions and unneeded column metadata. It's intended to be used only
 * at compaction time.
 */
public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Value> {

  @VisibleForTesting
  static final String GC_TIMESTAMP_OPT = "timestamp.gc";

  private static final String ZOOKEEPER_CONNECT_OPT = "zookeeper.connect";
  private Long gcTimestamp;
  private SortedKeyValueIterator<Key, Value> source;

  private ColumnBuffer keys = new ColumnBuffer();
  private ColumnBuffer keysFiltered = new ColumnBuffer();
  private HashSet<Long> completeTxs = new HashSet<>();
  private HashSet<Long> rolledback = new HashSet<>();
  private Key curCol = new Key();
  private long truncationTime;
  private int position = 0;
  boolean isFullMajc;

  @Override
  public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options,
      IteratorEnvironment env) throws IOException {

    if (env.getIteratorScope() == IteratorScope.scan) {
      throw new IllegalArgumentException();
    }
    this.source = source;
    isFullMajc = env.getIteratorScope() == IteratorScope.majc && env.isFullMajorCompaction();

    String oats = options.get(GC_TIMESTAMP_OPT);
    if (oats != null) {
      gcTimestamp = Long.valueOf(oats);
    } else {
      String zookeepers = options.get(ZOOKEEPER_CONNECT_OPT);
      if (zookeepers == null) {
        throw new IllegalArgumentException("A configuration item for GC iterator was not set");
      }
      gcTimestamp = ZookeeperUtil.getGcTimestamp(zookeepers);
    }
  }

  @Override
  public boolean hasTop() {
    return position < keysFiltered.size() || source.hasTop();
  }

  private void findTop() throws IOException {
    while (source.hasTop()) {
      readColMetadata();
      if (keysFiltered.size() == 0) {
        if (!consumeData()) {
          // not all data for current column was consumed
          break;
        }
      } else {
        break;
      }
    }
  }

  @Override
  public void next() throws IOException {
    if (position < keysFiltered.size() - 1) {
      position++;
      return;
    } else if (position == keysFiltered.size() - 1) {
      position++;
    } else {
      source.next();
    }

    if (consumeData()) {
      findTop();
    }
  }

  /**
   * @return true when all data columns in current column were consumed
   */
  private boolean consumeData() throws IOException {
    while (source.hasTop()
        && curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
      ColumnType colType = ColumnType.from(source.getTopKey());
      long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;

      if (colType == ColumnType.DATA) {
        if (ts >= truncationTime && !rolledback.contains(ts)) {
          return false;
        }
      } else {
        // TODO check if its a notify
        return false;
      }

      source.next();
    }

    return true;
  }

  @Override
  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
      throws IOException {
    source.seek(range, columnFamilies, inclusive);
    findTop();
  }

  private void readColMetadata() throws IOException {

    long invalidationTime = -1;

    boolean oldestSeen = false;
    boolean sawAck = false;
    long firstWrite = -1;
    long lastReadLockDeleteTs = -1;

    truncationTime = -1;

    position = 0;
    keys.clear();
    keysFiltered.clear();
    completeTxs.clear();
    rolledback.clear();

    curCol.set(source.getTopKey());

    if (NotificationUtil.isNtfy(source.getTopKey())) {
      return;
    }

    loop: while (source.hasTop()
        && curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {

      ColumnType colType = ColumnType.from(source.getTopKey());
      long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;

      switch (colType) {
        case TX_DONE: {
          keys.add(source.getTopKey(), source.getTopValue());
          completeTxs.add(ts);
          break;
        }
        case WRITE: {
          boolean keep = false;
          boolean complete = completeTxs.contains(ts);
          byte[] val = source.getTopValue().get();
          long timePtr = WriteValue.getTimestamp(val);

          if (WriteValue.isPrimary(val) && !complete) {
            keep = true;
          }

          if (!oldestSeen) {
            if (firstWrite == -1) {
              firstWrite = ts;
            }

            if (ts < gcTimestamp) {
              oldestSeen = true;
              truncationTime = timePtr;
              if (!(WriteValue.isDelete(val) && isFullMajc)) {
                keep = true;
              }
            } else {
              keep = true;
            }
          }

          if (timePtr > invalidationTime) {
            invalidationTime = timePtr;
          }

          if (keep) {
            keys.add(source.getTopKey(), val);
          } else if (complete) {
            completeTxs.remove(ts);
          }
          break;
        }
        case DEL_LOCK: {
          boolean keep = false;
          long txDoneTs = DelLockValue.getTxDoneTimestamp(source.getTopValue().get());
          boolean complete = completeTxs.contains(txDoneTs);

          byte[] val = source.getTopValue().get();

          if (!complete && DelLockValue.isPrimary(val)) {
            keep = true;
          }

          if (DelLockValue.isRollback(val)) {
            rolledback.add(ts);
            keep |= !isFullMajc;
          }

          if (ts > invalidationTime) {
            invalidationTime = ts;
          }

          if (keep) {
            keys.add(source.getTopKey(), source.getTopValue());
          } else if (complete) {
            completeTxs.remove(txDoneTs);
          }
          break;
        }
        case RLOCK: {
          boolean keep = false;
          long rlts = ReadLockUtil.decodeTs(ts);
          boolean isDelete = ReadLockUtil.isDelete(ts);

          if (isDelete) {
            lastReadLockDeleteTs = rlts;
          }

          if (rlts > invalidationTime) {
            if (isFullMajc) {
              if (isDelete) {
                if (DelReadLockValue.isRollback(source.getTopValue().get())) {
                  // can drop rolled back read lock delete markers on any full majc, do not need to
                  // consider gcTimestamp
                  keep = false;
                } else {
                  long rlockCommitTs =
                      DelReadLockValue.getCommitTimestamp(source.getTopValue().get());
                  keep = rlockCommitTs >= gcTimestamp;
                }
              } else {
                keep = lastReadLockDeleteTs != rlts;
              }
            } else {
              // can drop deleted read lock entries.. keep the delete entry.
              keep = isDelete || lastReadLockDeleteTs != rlts;
            }
          }

          if (keep) {
            keys.add(source.getTopKey(), source.getTopValue());
          }
          break;
        }
        case LOCK: {
          if (ts > invalidationTime) {
            keys.add(source.getTopKey(), source.getTopValue());
          }
          break;
        }
        case DATA: {
          // can stop looking
          break loop;
        }
        case ACK: {
          if (!sawAck) {
            if (ts >= firstWrite) {
              keys.add(source.getTopKey(), source.getTopValue());
            }
            sawAck = true;
          }
          break;
        }

        default:
          throw new IllegalArgumentException(" unknown colType " + colType);

      }

      source.next();
    }

    keys.copyTo(keysFiltered, (timestamp -> {
      if (ColumnType.from(timestamp) == ColumnType.TX_DONE) {
        return completeTxs.contains(timestamp & ColumnConstants.TIMESTAMP_MASK);
      } else {
        return true;
      }
    }));
  }

  @Override
  public Key getTopKey() {
    if (position < keysFiltered.size()) {
      return keysFiltered.getKey(position);
    } else {
      return source.getTopKey();
    }
  }

  @Override
  public Value getTopValue() {
    if (position < keysFiltered.size()) {
      return keysFiltered.getValue(position);
    } else {
      return source.getTopValue();
    }
  }

  @Override
  public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
    throw new UnsupportedOperationException();
  }

  public static void setZookeepers(IteratorSetting gcIter, String zookeepers) {
    gcIter.addOption(ZOOKEEPER_CONNECT_OPT, zookeepers);
  }
}
